package com.isyscore.os.etl.manager;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.isyscore.device.common.util.JsonMapper;
import com.isyscore.os.core.exception.DataFactoryException;
import com.isyscore.os.core.exception.ErrorCode;
import com.isyscore.os.etl.constant.FlinkApiConstant;
import com.isyscore.os.etl.model.FlinkSqlJobJarInfo;
import com.isyscore.os.etl.model.dto.FlinkSqlJarInfoDTO;
import com.isyscore.os.etl.service.FlinkSqlJobJarInfoService;
import com.isyscore.os.etl.service.JobRunLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.isyscore.os.etl.constant.FlinkApiConstant.API_RESULT_JOBID_PROP_NAME;

/**
 * @author wany
 * 用于管理上传远程执行flink sql所必须的jar包
 */
@Service
@Slf4j
public class FlinkSqlExecutorJarManager {

    @Autowired
    private FlinkSqlJobJarInfoService flinkSqlJobJarInfoService;
    private final Lock lock = new ReentrantLock();

    @Value("${flink-config.host}:${flink-config.port}")
    private String flinkConfigRestApiUrl;

    private String flinkSqlExecutorJarId;

    private static final String FLINK_SQL_EXECUTOR_MAIN_CLASS = "com.isyscore.os.flinksql.FlinkSqlJobApplication";
    @Autowired
    private  JobRunLogService jobRunLogService;

    private void prepareUploadFlinkJar(String flinkRestApiUrl) throws IOException {
        FlinkSqlJobJarInfo jarInfo = flinkSqlJobJarInfoService.getOne(Wrappers.lambdaQuery(FlinkSqlJobJarInfo.class).eq(FlinkSqlJobJarInfo::getFlinkAddress,flinkRestApiUrl),false);
        if (jarInfo == null) {
            //如果没有jar包上传的记录，则重新上传jar包
            jarInfo = new FlinkSqlJobJarInfo();
            FlinkSqlJarInfoDTO currentJarInfo = getCurrentFlinkJar();
            String jarId = uploadFlinkJar(currentJarInfo.getJarResource(),flinkRestApiUrl);
            jarInfo.setJarVersion(currentJarInfo.getJarVersion());
            jarInfo.setFlinkJarId(jarId);
            jarInfo.setUploadTime(LocalDateTime.now());
            jarInfo.setFlinkAddress(flinkRestApiUrl);
            flinkSqlJobJarInfoService.save(jarInfo);
            flinkSqlExecutorJarId = jarId;
        } else {
            FlinkSqlJarInfoDTO currentJarInfo = getCurrentFlinkJar();
            //如果当前jar包的版本号和数据库中记录不一致，则重新上传jar包
            if (!currentJarInfo.getJarVersion().equalsIgnoreCase(jarInfo.getJarVersion())) {
                flinkSqlJobJarInfoService.remove(Wrappers.lambdaQuery(FlinkSqlJobJarInfo.class).eq(FlinkSqlJobJarInfo::getFlinkJarId,jarInfo.getFlinkJarId()));
                String jarId = uploadFlinkJar(currentJarInfo.getJarResource(),flinkRestApiUrl);
                jarInfo.setFlinkJarId(jarId);
                jarInfo.setUploadTime(LocalDateTime.now());
                jarInfo.setJarVersion(currentJarInfo.getJarVersion());
                flinkSqlJobJarInfoService.save(jarInfo);
                flinkSqlExecutorJarId = jarId;
            } else {
                flinkSqlExecutorJarId = jarInfo.getFlinkJarId();
            }
        }
        //检查flink下是否存在执行Jar包，没有则重新上传jar包
        if(!checkRemoteJarExist(jarInfo,flinkRestApiUrl)){
            flinkSqlJobJarInfoService.remove(Wrappers.lambdaQuery(FlinkSqlJobJarInfo.class).eq(FlinkSqlJobJarInfo::getFlinkJarId,jarInfo.getFlinkJarId()));
            jarInfo = new FlinkSqlJobJarInfo();
            FlinkSqlJarInfoDTO currentJarInfo = getCurrentFlinkJar();
            String jarId = uploadFlinkJar(currentJarInfo.getJarResource(),flinkRestApiUrl);
            jarInfo.setJarVersion(currentJarInfo.getJarVersion());
            jarInfo.setFlinkJarId(jarId);
            jarInfo.setUploadTime(LocalDateTime.now());
            jarInfo.setFlinkAddress(flinkRestApiUrl);
            flinkSqlJobJarInfoService.save(jarInfo);
            flinkSqlExecutorJarId = jarId;
        }
    }


    private String uploadFlinkJar(cn.hutool.core.io.resource.Resource jarResource,String flinkRestApiUrl) {
        String jsonRes = HttpRequest.post(flinkRestApiUrl + "/jars/upload")
                .contentType("application/x-java-archive")
                .form("jarfile", jarResource)
                .execute().body();
        JSONObject jo = JSON.parseObject(jsonRes);
        log.info("upload flink-sql executor jar result: {}", jo.toJSONString());
        if (FlinkApiConstant.API_RESULT_STATUS_SUCCESS.equalsIgnoreCase(jo.getString(FlinkApiConstant.API_RESULT_STATUS_PROP_NAME))) {
            String fileName = jo.getString("filename");
            String[] tmp = fileName.split("/");
            return tmp[tmp.length - 1];
        } else {
            throw new DataFactoryException(ErrorCode.INTERNAL_ERROR, "upload flink-sql executor jar failed");
        }
    }

    private FlinkSqlJarInfoDTO getCurrentFlinkJar() throws IOException {
        PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        Resource[] resources = resolver.getResources("classpath:jars/isc-flink-sql-executor-*.jar");
        if (resources.length == 0) {
            throw new DataFactoryException(ErrorCode.INTERNAL_ERROR, "can not find flink-sql executor jar");
        }
        String jarName = resources[0].getFilename();
        ClassPathResource jarResource = new ClassPathResource("jars/" + jarName);
        String jarVersion = jarName.substring(jarName.lastIndexOf("-") + 1, jarName.lastIndexOf("."));
        FlinkSqlJarInfoDTO jarInfoDTO = new FlinkSqlJarInfoDTO();
        jarInfoDTO.setJarResource(jarResource);
        jarInfoDTO.setJarVersion(jarVersion);
        return jarInfoDTO;
    }

    public String submitSqlJob(String sql, List<String> classpaths,String flinkServiceAddress, int parallelism, StringBuilder localLog, String jobRunLogId) {
        String flinkRestApiUrl;
        //针对传入地址不包含http或者结尾以斜线结尾的处理
        if(StrUtil.isNotBlank(flinkServiceAddress)){
            if (flinkServiceAddress.startsWith("http://") || flinkServiceAddress.startsWith("https://")) {
                flinkRestApiUrl=flinkServiceAddress;
            }else {
                flinkRestApiUrl="http://"+flinkServiceAddress;
            }
            if(flinkRestApiUrl.endsWith("/")){
                flinkRestApiUrl=flinkRestApiUrl.substring(0,flinkRestApiUrl.length()-1);
            }
        }else {
            flinkRestApiUrl=this.flinkConfigRestApiUrl;
        }
        lock.lock();
        try {
            prepareUploadFlinkJar(flinkRestApiUrl);
        } catch (IOException e) {
            throw new DataFactoryException(ErrorCode.INTERNAL_ERROR, "执行SQL任务失败:" + "jar上传失败");
        }finally {
            lock.unlock();
        }
        String base64Sql = Base64.getEncoder().encodeToString(sql.getBytes(StandardCharsets.UTF_8));
        String apiUrl = flinkRestApiUrl + "/jars/" + flinkSqlExecutorJarId + "/run";
        Map<String, Object> params = new HashMap<>();
        StringBuilder programArgs = new StringBuilder();
        programArgs.append("-sql")
                .append(" ")
                .append(base64Sql);
        if (classpaths != null && classpaths.size() > 0) {
            programArgs.append(" ").append("-C")
                    .append(" ")
                    .append(StrUtil.join(",", classpaths));
        }
        params.put("entryClass", FLINK_SQL_EXECUTOR_MAIN_CLASS);
        params.put("programArgs", programArgs.toString());
        params.put("parallelism", parallelism);
        String jsonRes = HttpRequest.post(apiUrl).body(JsonMapper.toAlwaysJson(params)).execute().body();
        JSONObject jo = JSON.parseObject(jsonRes);
        log.info("submit flink-sql job result: {}", jo.toJSONString());
        localLog.append("启动命令：").append("\n");
        localLog.append("url：").append(apiUrl).append("\n");
        localLog.append("body：").append(JsonMapper.toAlwaysJson(params)).append("\n");
        localLog.append("result：").append(jo.toJSONString()).append("\n");
        if (StrUtil.isBlank(jo.getString(API_RESULT_JOBID_PROP_NAME))) {
            localLog.append("执行SQL任务失败，result=").append(jsonRes).append("\n");
            jobRunLogService.updateLogById(localLog.toString(), jobRunLogId);
            throw new DataFactoryException(ErrorCode.INTERNAL_ERROR, "执行SQL任务失败:" + jsonRes);
        }
        return jo.getString(API_RESULT_JOBID_PROP_NAME);
    }
    /**
     *     检查flink下是否存在执行Jar包
     * @return 存在jar返回true,不存在返回false
     */
    private boolean checkRemoteJarExist(FlinkSqlJobJarInfo jarInfo,String flinkRestApiUrl){
        String apiUrl = flinkRestApiUrl + "/jars";
        String jsonRes = HttpRequest.get(apiUrl).execute().body();
        JSONObject jo = JSON.parseObject(jsonRes);
        JSONArray array=jo.getJSONArray("files");
        if(CollectionUtil.isEmpty(array)){
            return false;
        }
        for(int x=0;x<array.size();x++){
            if(jarInfo.getFlinkJarId().equals(array.getJSONObject(x).getString("id"))){
                return true;
            }
        }
        return false;
    }

    public static void main(String[] args) {
        String x="12345";
        System.out.println(x.substring(0,3));
    }
}
